Spark数据倾斜解决方案
一 前言
本文介绍的是开发spark极其核心的地方,可以说懂得解决spark数据倾斜是区分一个spark工程师是否足够专业的标准,在开发中几乎天天面临这个问题。
二 原理以及现象
先来解释一下,出现什么现象的时候我们认定他为数据倾斜,以及数据倾斜发生的原理是什么?
比如一个spark任务中,绝大多数task任务运行速度很快,但是就是有那么几个task任务运行极其缓慢,慢慢的可能就接着报内存溢出的问题了,那么这个时候我们就可以认定是数据倾斜了。
接下来说一下发生数据倾斜的底层理论,其实可以非常肯定的说,数据倾斜就是发生在shuffle类的算子中,在进行shuffle的时候,必须将各个节点的相同的key拉到某个节点上的一个task来进行处理,比如按照key进行聚合和join操作等,这个时候其中某一个key数量特别大,于是就发生了数据倾斜
数据倾斜示意图
分组聚合逻辑中,需要把相同key的数据发往下游同一个task,如果某个或某几个key的数量特别大,则会导致下游的某个或某几个task所要处理的数据量特别大,也就是要处理的任务负载特别大
join计算中,A表和B表中相同key的数据,需要发往下游同一个task,如果A表中或B表中,某个key或某几个key的数量特别大,则会导致下游的某个或某几个task所要处理的数据量特别大,也就是要处理的任务负载特别大
三 数据倾斜的危害
当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。
另外,当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。
四 定位数据倾斜的代码
上面我们知道了数据倾斜的底层原理,那么就好定位代码了,所以我就可以改写这段代码,让spark任务来正常运行了。
我们知道了导致数据倾斜的问题就是shuffle算子,所以我们先去找到代码中的shuffle的算子,比如distinct、groupBYkey、reduceBykey、aggergateBykey、join、cogroup、repartition等,那么问题一定就出现在这里。
找到shuffle类的算子之后,我们知道一个application分为job,那么一个job又划分为多个stage,stage的划分就是根据shuffle类的算子,也可以说是宽依赖来划分的,所以这个时候我们在spark UI界面上点击查看stage,如下图:
可以看到94这一行和91这一行,执行时间明显比其他的执行时间要长太多了,我们就可以肯定一定是这里发生了数据倾斜,然后我们就找到了发生数据倾斜的stage了,然后根据stage划分原理,我们就可以推算出来发生倾斜的那个stage对应的代码中的哪一部分了。
这个时候我们找到了数据倾斜发生的地方了,但是我们还需要知道到底是哪个key数据量特别大导致的数据倾斜,于是接下来来聊一聊这个问题。
找到这个key的算法,我们可以使用采样的方式
代码如下:
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
现在简单说一下原理,从所有key中,把其中每一个key随机取出来一部分,然后进行一个百分比的推算,学过采样算法的都知道,这是用局部取推算整体,虽然有点不准确,但是在整体概率上来说,我们只需要大概就可以定位那个最多的key了。
或者
df.select("key").sample(false, 0.1) // 数据采样
.(k => (k, 1)).reduceBykey(_ + _) // 统计 key 出现的次数
.map(k => (k._2, k._1)).sortByKey(false) // 根据 key 出现次数进行排序
.take(10) // 取前 10 个。
五 解决数据倾斜的方案
上面我们聊了数据倾斜发生的原理以及如何定位是哪个key发生了数据倾斜,这个时候我们就开始着手解决这个问题了,每个方案都有对应的情况,可以针对自己的情况来灵活运用。
解决方案一:过滤异常数据
空值或者异常值之类的,大多是这个原因引起
无效数据,大量重复的测试数据或是对结果影响不大的有效数据
解决方案二: 提高shuffle 的并行度。
原理很简单,我们知道在rduceBykey中有一个shuffle read task的值默认为200,也就是说用两百个task来处理任务,对于我们一个很大的集群来说,每个task的任务中需要处理的key也是比较多的,这个时候我们把这个数量给提高,比如设置reduceBYkey(1000),这个时候task的数量就多了,然后分配到每个task中的key就少了,于是说并行度就提高了。但是总体来说,这种解决办法对于某一个数量特别大的key来说效果甚为,只能说key多的时候,我们可以有一定的程度上环境数据倾斜的问题,所以这种方法也不是我们要找到的最好的办法,有一定的局限性。
解决方案三:两阶段聚合(加盐局部聚合+去盐全局聚合)
用这个方法就可以解决大部分聚合运算场景的数据倾斜。
假如我们有一个rdd,他的其中某一个key数量比较大,我们要进行shuffle的时候,速度比较慢。
比如这个key就是hello,他的条数已经有1万条。单一的进行shuffle肯定是耗时非常长。所以我们给他打上10以内的随机前缀,例如下面这种形式。
0_hello,1
1_hello,1
2_hello,1
3_hello,1
0_hello,1
2_hello,1
3_hello,1
.....
然后这个时候进行局部的预聚合,比如reduceBykey,于是经过局部的聚合后,我们得到了下面这种
0_hello,3000
1_hello,2000
2_hello,2500
3_hello,2500
然后在去掉之前加的随机前缀,在进行聚合,reduceBykey
hello,10000
于是通过这种把key进行拆分的方式,我们把key分配给了一些task去执行任务,经过实验数据表明,这种方法可以提高数倍效率
这里给出具体的一些代码,可以参考:
object WordCountAggTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
val array = Array("you you","you you","you you",
"you you",
"you you",
"you you",
"you you",
"jump jump")
val rdd = sc.parallelize(array,8)
rdd.flatMap( line => line.split(" "))
.map(word =>{
val prefix = (new util.Random).nextInt(3)
(prefix+"_"+word,1)
}).reduceByKey(_+_)
.map( wc =>{
val newWord=wc._1.split("_")(1)
val count=wc._2
(newWord,count)
}).reduceByKey(_+_)
.foreach( wc =>{
println("单词:"+wc._1 + " 次数:"+wc._2)
})
}
}
这种方法也是有局限性的,他适用于聚合类的shuffle操作,如果对于join操作,还是不行的,所以我们接着探究更深入的方法。
解决方案四:将reduce join转成map join
这种方法是有假定的前提的条件的,比如有两个rdd进行join操作,其中一个rdd的数据量不是很大,比如低于1个G的情况。
具体操作是就是选择两个rdd中那个比较数据量小的,然后我们把它拉到driver端,再然后通过广播变量(broadcast)的方式给他广播出去,这个时候再进行join 的话,因为数据都是在同一Executor中,所以shuffle 中不会有数据的传输,也就避免了shuffle带来的数据倾斜。
给出参考代码:
object MapJoinTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
val lista=Array(
Tuple2("001","小红"),
Tuple2("002","小明")
)
//数据量小一点
val listb=Array(
Tuple2("001","江西"),
Tuple2("002","山东")
)
val listaRDD = sc.parallelize(lista)
val listbRDD = sc.parallelize(listb)
//val result: RDD[(String, (String, String))] = listaRDD.join(listbRDD)
//设置广播变量
val listbBoradcast = sc.broadcast(listbRDD.collect())
listaRDD.map( tuple =>{
val key = tuple._1
val name = tuple._2
val map = listbBoradcast.value.toMap
val className = map.get(key)
(key,(name,className))
}).foreach( tuple =>{
println("省编码"+tuple._1 + " 姓名:"+tuple._2._1 + " 省份名称:"+tuple._2._2.get)
})
}
}
这种方法也是有局限性的,比如两个rdd都非常大,比如超过了10个G,这个时候我们就不能用这种方法了,因为数据量太大了,广播变量还是需要太大的消耗
解决方案五: 拆分 join 再 union
join因为热点值导致长尾,也可以将热点数据和非热点数据分开处理,最后合并
解决方案六: 大表 key 加盐,小表扩大 N 倍 join
如果出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。
适用:一个数据集存在的倾斜 Key 比较多,另外一个数据集数据分布比较均匀。
缺点:需要将一个数据集整体扩大 N 倍,会增加资源消耗。
六 总结
数据倾斜是在shuffle中产生的,shuffle过程中造成了下游task的数据任务不均衡
数据倾斜常见的场景就是那些需要shuffle的场景,比如reduce,reducebykey,join,groupbykey…..
只要灵活运用上述思想,基本上能解决任何数据倾斜场景。文中代码示例是用spark,但是对于mapreduce,也是一样的思想;而至于hive,不用单独讨论,因为hive底层要么就是mapreduce,要么就是spark